Skip to content

naneos.iotweb

NaneosUploadThread

Bases: Thread

Source code in src/naneos/iotweb/naneos_upload_thread.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class NaneosUploadThread(Thread):
    URL: ClassVar[str] = "https://hg3zkburji.execute-api.eu-central-1.amazonaws.com/prod/proto/v1"
    HEADERS: ClassVar[dict] = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    def __init__(
        self,
        data: dict[int, pd.DataFrame],
        callback: Optional[Callable[[bool], None]],
    ) -> None:
        """Adding the data that should be uploaded to the database.

        Args:
            data (dict[int, pd.DataFrame]): Data to upload, where the key is the device serial number and the value is a DataFrame.
            callback (Optional[Callable[[bool], None]]): Callback function that is called after upload.
        """
        super().__init__()
        self.data = data
        self._callback = callback

    def run(self) -> None:
        try:
            ret = self.upload(self.data)

            if self._callback:
                if ret.status_code == 200:
                    self._callback(True)
                else:
                    self._callback(False)
        except Exception as e:
            logger.exception(f"Error in upload: {e}")
            if self._callback:
                self._callback(False)  # delete data because it was corrupted

    @staticmethod
    def get_body(upload_string: str) -> str:
        return f"""
            {{
                "gateway": "python_webhook",
                "data": "{upload_string}",
                "published_at": "{datetime.datetime.now().isoformat()}"
            }}
            """

    @classmethod
    def upload(cls, data: dict[int, pd.DataFrame]) -> requests.Response:
        abs_time = int(datetime.datetime.now().timestamp())
        devices = []

        for sn, df in data.items():
            # make all inf values in df_p2_pro 0
            df = df.replace([float("inf"), -float("inf")], 0)

            # detect ms timestamp and convert to s
            if df.index[0] > 1e12:
                df.index = df.index / 1e3
                df.index = df.index.astype(int)

            devices.append(create_proto_device(sn, abs_time, df))

        combined_entry = create_combined_entry(devices=devices, abs_timestamp=abs_time)

        proto_str = combined_entry.SerializeToString()
        proto_str_base64 = base64.b64encode(proto_str).decode()

        body = cls.get_body(proto_str_base64)
        r = requests.post(cls.URL, headers=cls.HEADERS, data=body, timeout=10)
        return r

__init__(data, callback)

Adding the data that should be uploaded to the database.

Parameters:

Name Type Description Default
data dict[int, DataFrame]

Data to upload, where the key is the device serial number and the value is a DataFrame.

required
callback Optional[Callable[[bool], None]]

Callback function that is called after upload.

required
Source code in src/naneos/iotweb/naneos_upload_thread.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(
    self,
    data: dict[int, pd.DataFrame],
    callback: Optional[Callable[[bool], None]],
) -> None:
    """Adding the data that should be uploaded to the database.

    Args:
        data (dict[int, pd.DataFrame]): Data to upload, where the key is the device serial number and the value is a DataFrame.
        callback (Optional[Callable[[bool], None]]): Callback function that is called after upload.
    """
    super().__init__()
    self.data = data
    self._callback = callback

download_from_iotweb(name, serial_number, start, stop, token)

Download your data from influxdb.naneos.ch. 1 Month of data takes about 30 seconds to download and uses about 100 MB of data.

You need to have a token to access the data. Ask mario.huegi@naneos.ch for your read token. We kindly ask you to not overuse our server. If you need to download the same data in a recuring pattern, contact us.

Parameters:

Name Type Description Default
name str

Name of the influx bucket.

required
serial_number str

Serial number of your device as string.

required
start datetime

Start date of the data you want to download.

required
stop datetime

End date of the data you want to download.

required
token str

Your read token. Do not push your token to public repositories.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Dataframe with your data.

Source code in src/naneos/iotweb/download/downloader.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def download_from_iotweb(
    name: str, serial_number: str, start: dt.datetime, stop: dt.datetime, token: str
) -> pd.DataFrame:
    """Download your data from influxdb.naneos.ch.
    1 Month of data takes about 30 seconds to download and uses about 100 MB of data.

    You need to have a token to access the data.
    Ask mario.huegi@naneos.ch for your read token.
    We kindly ask you to not overuse our server.
    If you need to download the same data in a recuring pattern, contact us.

    Args:
        name (str): Name of the influx bucket.
        serial_number (str): Serial number of your device as string.
        start (dt.datetime): Start date of the data you want to download.
        stop (dt.datetime): End date of the data you want to download.
        token (str): Your read token. Do not push your token to public repositories.

    Returns:
        pd.DataFrame: Dataframe with your data.
    """
    timestamps = create_start_stop_timestamp(start, stop)

    dfs = []

    with InfluxDBClient(url=URL_INFLUX, org=ORG_INFLUX, token=token) as client:
        for t1, t2 in timestamps:
            query = get_query(name, serial_number, t1, t2)

            df = client.query_api().query_data_frame(query)

            if isinstance(df, list):
                dfs.extend(df)
            elif isinstance(df, pd.DataFrame):
                dfs.append(df)
            else:
                logger.warning(f"Unknown type: {type(df)}")

    df = pd.concat(dfs, axis=0)
    df.set_index("_time", inplace=True)
    df.drop(["result", "table"], axis=1, inplace=True)

    return df