Skip to content

naneos_upload_thread

NaneosUploadThread

Bases: Thread

Source code in src/naneos/iotweb/naneos_upload_thread.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class NaneosUploadThread(Thread):
    URL: ClassVar[str] = "https://hg3zkburji.execute-api.eu-central-1.amazonaws.com/prod/proto/v1"
    HEADERS: ClassVar[dict] = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    def __init__(
        self,
        data: dict[int, pd.DataFrame],
        callback: Optional[Callable[[bool], None]],
    ) -> None:
        """Adding the data that should be uploaded to the database.

        Args:
            data (dict[int, pd.DataFrame]): Data to upload, where the key is the device serial number and the value is a DataFrame.
            callback (Optional[Callable[[bool], None]]): Callback function that is called after upload.
        """
        super().__init__()
        self.data = data
        self._callback = callback

    def run(self) -> None:
        try:
            ret = self.upload(self.data)

            if self._callback:
                if ret.status_code == 200:
                    self._callback(True)
                else:
                    self._callback(False)
        except Exception as e:
            logger.exception(f"Error in upload: {e}")
            if self._callback:
                self._callback(False)  # delete data because it was corrupted

    @staticmethod
    def get_body(upload_string: str) -> str:
        return f"""
            {{
                "gateway": "python_webhook",
                "data": "{upload_string}",
                "published_at": "{datetime.datetime.now().isoformat()}"
            }}
            """

    @classmethod
    def upload(cls, data: dict[int, pd.DataFrame]) -> requests.Response:
        abs_time = int(datetime.datetime.now().timestamp())
        devices = []

        for sn, df in data.items():
            # make all inf values in df_p2_pro 0
            df = df.replace([float("inf"), -float("inf")], 0)

            # detect ms timestamp and convert to s
            if df.index[0] > 1e12:
                df.index = df.index / 1e3
                df.index = df.index.astype(int)

            devices.append(create_proto_device(sn, abs_time, df))

        combined_entry = create_combined_entry(devices=devices, abs_timestamp=abs_time)

        proto_str = combined_entry.SerializeToString()
        proto_str_base64 = base64.b64encode(proto_str).decode()

        body = cls.get_body(proto_str_base64)
        r = requests.post(cls.URL, headers=cls.HEADERS, data=body, timeout=10)
        return r

__init__(data, callback)

Adding the data that should be uploaded to the database.

Parameters:

Name Type Description Default
data dict[int, DataFrame]

Data to upload, where the key is the device serial number and the value is a DataFrame.

required
callback Optional[Callable[[bool], None]]

Callback function that is called after upload.

required
Source code in src/naneos/iotweb/naneos_upload_thread.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
    self,
    data: dict[int, pd.DataFrame],
    callback: Optional[Callable[[bool], None]],
) -> None:
    """Adding the data that should be uploaded to the database.

    Args:
        data (dict[int, pd.DataFrame]): Data to upload, where the key is the device serial number and the value is a DataFrame.
        callback (Optional[Callable[[bool], None]]): Callback function that is called after upload.
    """
    super().__init__()
    self.data = data
    self._callback = callback