Skip to content

partector2_pro_cs

Partector2ProCs

Bases: Partector2Pro

Source code in src/naneos/partector/partector2_pro_cs.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class Partector2ProCs(Partector2Pro):
    CS_OFF = 0
    CS_ON = 1
    CS_UNKNOWN = -1

    def __init__(
        self,
        serial_number: Optional[int] = None,
        port: Optional[str] = None,
        verb_freq: int = 1,
        **kwargs: Any,
    ) -> None:
        self._catalyst_state = self.CS_UNKNOWN
        self._auto_mode = True

        self._callback_catalyst = kwargs.get("callback_catalyst", None)
        if self._callback_catalyst is None:
            logger.error("No callback function for catalyst state given!")
            raise ValueError("No callback function for catalyst state given!")
        super().__init__(serial_number, port, verb_freq, "P2proCS")

    def _init_serial_data_structure(self) -> None:
        self.device_type = NaneosDeviceDataPoint.DEV_TYPE_P2PRO_CS
        self._data_structure = PARTECTOR2_PRO_CS_DATA_STRUCTURE_V315

    def _set_verbose_freq(self, freq: int) -> None:
        if freq == 0:
            self._write_line("X0000!")
        else:
            if self._fw >= 311:
                self._data_structure = PARTECTOR2_PRO_CS_DATA_STRUCTURE_V315

            self._write_line("h2001!")  # activates harmonics output
            self._write_line("M0004!")  # activates size dist mode
            self._write_line("X0006!")  # activates verbose mode

    def set_catalyst_state(self, state: str) -> None:
        """Sets the catalyst state to on, off or auto."""
        if not self._connected:
            return

        if state == "on":
            self._write_line("CSon!")
            self._cs_state = self.CS_ON
            self._auto_mode = False
        elif state == "off":
            self._write_line("CSoff!")
            self._cs_state = self.CS_OFF
            self._auto_mode = False
        elif state == "auto":
            self._write_line("CSauto!")
            self._auto_mode = True
        else:
            logger.warning(f"Unknown catalyst state: {state} -> nothing done.")
            return

        logger.info(f"Catalyst state set to {state}.")

    def _serial_reading_routine(self) -> None:
        line = self._read_line()

        if not line or line == "":
            return

        if "CS_on" in line:
            self._catalyst_state = self.CS_ON
            if self._callback_catalyst:
                self._callback_catalyst(True)
            self._mark_cs_change()
            return
        elif "CS_off" in line:
            self._catalyst_state = self.CS_OFF
            if self._callback_catalyst:
                self._callback_catalyst(False)
            self._mark_cs_change()
            return

        self._put_line_to_queue(line)

    def _mark_cs_change(self) -> None:
        try:
            last_line = self._queue.pop()
            last_line[-1] = f"1{last_line[-1]}"
            self._queue.append(last_line)
        except Exception as excep:
            logger.warning(f"Could not mark catalyst state change: {excep}")

    def _put_line_to_queue(self, line: str) -> None:
        unix_timestamp = int(datetime.now(tz=timezone.utc).timestamp())
        data = [unix_timestamp] + line.split("\t")

        self._notify_message_received()

        if len(data) != len(self._data_structure):
            self._queue_info.append(data)
            return

        state: Optional[int] = None
        try:
            state = int(data[-1])
        except Exception as excep:
            logger.warning(f"Could not parse catalyst state in backup function: {excep}")

        if state in [0, 1] and self._catalyst_state != state:
            if self._callback_catalyst:
                self._callback_catalyst(state == 1)
            self._catalyst_state = state
            logger.warning(f"Set catalyst state to {state} by backup function to.")

        self._queue.append(data)

set_catalyst_state(state)

Sets the catalyst state to on, off or auto.

Source code in src/naneos/partector/partector2_pro_cs.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def set_catalyst_state(self, state: str) -> None:
    """Sets the catalyst state to on, off or auto."""
    if not self._connected:
        return

    if state == "on":
        self._write_line("CSon!")
        self._cs_state = self.CS_ON
        self._auto_mode = False
    elif state == "off":
        self._write_line("CSoff!")
        self._cs_state = self.CS_OFF
        self._auto_mode = False
    elif state == "auto":
        self._write_line("CSauto!")
        self._auto_mode = True
    else:
        logger.warning(f"Unknown catalyst state: {state} -> nothing done.")
        return

    logger.info(f"Catalyst state set to {state}.")