Source code for lories.data.util

# -*- coding: utf-8 -*-
"""
lories.data.util
~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

import hashlib
import re
from copy import copy, deepcopy
from typing import Optional, Tuple

import numpy as np
import pandas as pd
import pytz as tz
from pandas.tseries.frequencies import to_offset

# FIXME: Remove this once Python >= 3.9 is a requirement
try:
    from typing import Literal

except ImportError:
    from typing_extensions import Literal


# noinspection PyShadowingBuiltins
def hash_data(
    data: pd.DataFrame,
    method: Literal["MD5", "SHA1", "SHA256", "SHA512"] = "MD5",
    encoding: str = "UTF-8",
) -> str:
    index_column = data.index.name if data.index is not None else "index"
    data_columns = data.columns
    data = deepcopy(data)

    for column in data.select_dtypes(include=["datetime64", "datetimetz"]).columns:
        data[column] = data[column].dt.tz_convert(tz.UTC).view(np.int64) // 10**9

    data[data.index.name] = data.index.tz_convert(tz.UTC).view(np.int64) // 10**9
    data = data[[index_column, *data_columns]]

    csv = data.to_csv(index=False, header=False, sep=",", decimal=".", float_format="%.10g")
    csv = ",".join(re.sub(r",,+", ",", line).strip(",") for line in csv.splitlines())
    return hash_value(csv, method, encoding)


def hash_value(
    value: str,
    method: Literal["MD5", "SHA1", "SHA256", "SHA512"],
    encoding: str = "UTF-8",
) -> str:
    method = method.lower()
    if method == "md5":
        return hashlib.md5(value.encode(encoding)).hexdigest()
    if method == "sha1":
        return hashlib.sha1(value.encode(encoding)).hexdigest()
    if method == "sha256":
        return hashlib.sha256(value.encode(encoding)).hexdigest()
    if method == "sha512":
        return hashlib.sha512(value.encode(encoding)).hexdigest()
    raise ValueError(f"Invalid checksum method '{method}'")


# noinspection PyUnresolvedReferences
def resample(
    data: pd.DataFrame | pd.Series,
    freq: str,
    func: Literal["sum", "mean", "min", "max", "last"],
    offset: Optional[pd.Timedelta] = None,
) -> pd.DataFrame | pd.Series:
    if func not in ["sum", "mean", "min", "max", "last"]:
        raise ValueError(f"Invalid resampling function '{func}'")
    freq = to_offset(freq)

    index = copy(data.index)
    index_freq = index.freq
    if index_freq is None and len(index) > 2:
        index_freq = to_offset(pd.infer_freq(index))
    if index_freq is None or index_freq < freq:
        resampled = data.resample(freq, closed="right", offset=offset)
        if func == "sum":
            data = resampled.sum()
        elif func == "mean":
            data = resampled.mean()
        elif func == "min":
            data = resampled.min()
        elif func == "max":
            data = resampled.max()
        elif func == "last":
            data = resampled.last()
        data.index += freq

    data.dropna(axis="columns", how="all", inplace=True)
    data.dropna(axis="index", how="all", inplace=True)
    data.index.name = index.name
    return data


def scale_power(name: str, power: float) -> Tuple[str, float]:
    if power >= 1e7:
        power = round(power / 1e6, 2)
        name = name.replace("W", "MW")
    elif power >= 1e4:
        power = round(power / 1e3, 2)
        name = name.replace("W", "kW")
    else:
        power = round(power, 2)
    return name, power


def scale_energy(name: str, energy: float) -> Tuple[str, float]:
    if energy >= 1e7:
        energy = round(energy / 1e6, 2)
        name = name.replace("kWh", "GWh")
    elif energy >= 1e4:
        energy = round(energy / 1e3, 2)
        name = name.replace("kWh", "MWh")
    else:
        energy = round(energy, 2)
    return name, energy


[docs] def derive_by_hours(data: pd.Series) -> pd.Series: """ Derive a data series by hours. Parameters ---------- data : pandas.Series Series with the data to be derived Returns ---------- fixed: pandas.Series Series with the derived data """ delta_value = data.iloc[:].astype("float64").diff() delta_index = pd.Series(delta_value.index, index=delta_value.index) delta_index = (delta_index - delta_index.shift(1)) / np.timedelta64(1, "h") return pd.Series(delta_value / delta_index, index=data.index).dropna()