Skip to content

partector_serial_manager

PartectorSerialManager

Bases: Thread

Source code in src/naneos/partector/partector_serial_manager.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class PartectorSerialManager(threading.Thread):
    def __init__(self) -> None:
        super().__init__(daemon=True)
        self._stop_event = threading.Event()

        self._data: dict[int, pd.DataFrame] = {}

        self._connected_p1: dict[str, Partector1] = {}
        self._connected_p2: dict[str, Partector2] = {}
        self._connected_p2_pro: dict[str, Partector2Pro] = {}

    def get_data(self) -> dict[int, pd.DataFrame]:
        """Fetches the data from all connected devices and returns it."""
        self._fetch_data()
        data = self._data
        self._data = {}
        return data

    def _fetch_data(self):
        """Returns the data dictionary and deletes it."""
        for port in list(self._connected_p1.keys()):
            points = self._connected_p1[port].get_data()
            for point in points:
                self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, point)

        for port in list(self._connected_p2.keys()):
            points = self._connected_p2[port].get_data()
            for point in points:
                self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, point)

        for port in list(self._connected_p2_pro.keys()):
            points = self._connected_p2_pro[port].get_data()
            for point in points:
                self._data = NaneosDeviceDataPoint.add_data_point_to_dict(self._data, point)

    def stop(self) -> None:
        self._stop_event.set()

    def run(self) -> None:
        try:
            self._manager_loop()
        except RuntimeError as e:
            logger.exception(f"SerialManager loop exited with: {e}")

    def get_connected_device_strings(self) -> list[str]:
        """Returns a list of connected device strings."""
        p1_strings = [f"SN{p._sn} (P1)" for port, p in self._connected_p1.items()]
        p2_strings = [f"SN{p._sn} (P2)" for port, p in self._connected_p2.items()]
        p2_pro_strings = [f"SN{p._sn} (P2 Pro)" for port, p in self._connected_p2_pro.items()]

        return p1_strings + p2_strings + p2_pro_strings

    def get_connected_addresses(self) -> list[str]:
        p1_ports = list(self._connected_p1.keys())
        p2_ports = list(self._connected_p2.keys())
        p2_pro_ports = list(self._connected_p2_pro.keys())

        return p1_ports + p2_ports + p2_pro_ports

    def get_connected_serial_numbers(self) -> list[int | None]:
        p1_serials = [p._sn for p in self._connected_p1.values()]
        p2_serials = [p._sn for p in self._connected_p2.values()]
        p2_pro_serials = [p._sn for p in self._connected_p2_pro.values()]

        return p1_serials + p2_serials + p2_pro_serials

    def _manager_loop(self) -> None:
        while not self._stop_event.is_set():
            try:
                possible_ports = scan_for_serial_partectors(
                    ports_exclude=self.get_connected_addresses()
                )

                self._disconnect_unplugged_ports()
                self._connect_to_new_ports(possible_ports)

                self._fetch_data()  # Fetch data from all connected devices

                time.sleep(1.0)  # Sleep to avoid busy waiting

            except Exception as e:
                logger.exception(f"Error in serial manager loop: {e}")

        self._close_all_ports()

    def _disconnect_unplugged_ports(self) -> None:
        """Disconnects all ports that are not in the possible_ports dictionary."""
        # Disconnect P1 ports
        for port in list(self._connected_p1.keys()):
            if not self._connected_p1[port]._connected:
                self._connected_p1[port].close()
                self._connected_p1.pop(port, None)

        # Disconnect P2 ports
        for port in list(self._connected_p2.keys()):
            if not self._connected_p2[port]._connected:
                self._connected_p2[port].close()
                self._connected_p2.pop(port, None)

        # Disconnect P2 Pro ports
        for port in list(self._connected_p2_pro.keys()):
            if not self._connected_p2_pro[port]._connected:
                self._connected_p2_pro[port].close()
                self._connected_p2_pro.pop(port, None)

    def _connect_to_new_ports(self, possible_ports: dict[str, dict[int, str]]) -> None:
        p1_ports = possible_ports["P1"].values()
        p2_ports = possible_ports["P2"].values()
        p2pro_ports = possible_ports["P2pro"].values()

        for port in p1_ports:
            self._connected_p1[port] = Partector1(port=port)
        for port in p2_ports:
            self._connected_p2[port] = Partector2(port=port)
        for port in p2pro_ports:
            self._connected_p2_pro[port] = Partector2Pro(port=port)

    def _close_all_ports(self) -> None:
        for port in list(self._connected_p1.keys()):
            self._connected_p1[port].close()
            self._connected_p1.pop(port, None)

        for port in list(self._connected_p2.keys()):
            self._connected_p2[port].close()
            self._connected_p2.pop(port, None)

        for port in list(self._connected_p2_pro.keys()):
            self._connected_p2_pro[port].close()
            self._connected_p2_pro.pop(port, None)

get_connected_device_strings()

Returns a list of connected device strings.

Source code in src/naneos/partector/partector_serial_manager.py
60
61
62
63
64
65
66
def get_connected_device_strings(self) -> list[str]:
    """Returns a list of connected device strings."""
    p1_strings = [f"SN{p._sn} (P1)" for port, p in self._connected_p1.items()]
    p2_strings = [f"SN{p._sn} (P2)" for port, p in self._connected_p2.items()]
    p2_pro_strings = [f"SN{p._sn} (P2 Pro)" for port, p in self._connected_p2_pro.items()]

    return p1_strings + p2_strings + p2_pro_strings

get_data()

Fetches the data from all connected devices and returns it.

Source code in src/naneos/partector/partector_serial_manager.py
27
28
29
30
31
32
def get_data(self) -> dict[int, pd.DataFrame]:
    """Fetches the data from all connected devices and returns it."""
    self._fetch_data()
    data = self._data
    self._data = {}
    return data