Source code for lories.data.channels.channel

# -*- coding: utf-8 -*-
"""
lories.data.channels.channel
~~~~~~~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

from collections import OrderedDict
from collections.abc import Callable
from typing import Any, Collection, Dict, List, Optional, Type

import pandas as pd
import pytz as tz
from lories._core._channel import ChannelState, _Channel  # noqa
from lories._core._data import DataContext, DataManager, _DataContext, _DataManager  # noqa
from lories._core.typing import Timestamp  # noqa
from lories.core import Resource, ResourceError
from lories.data.channels import ChannelConnector, ChannelConverter, Channels
from lories.util import parse_freq, to_timedelta

# FIXME: Remove this once Python >= 3.9 is a requirement
try:
    from typing import Literal

except ImportError:
    from typing_extensions import Literal


[docs] class Channel(_Channel, Resource): __context: _DataContext _timestamp: pd.Timestamp = pd.NaT _value: Optional[Any] = None _state: str | ChannelState = ChannelState.DISABLED logger: ChannelConnector connector: ChannelConnector converter: ChannelConverter # noinspection PyShadowingBuiltins def __init__( self, id: str = None, key: str = None, name: str = None, type: str | Type = None, context: DataManager = None, converter: ChannelConverter = None, connector: Optional[ChannelConnector] = None, logger: Optional[ChannelConnector] = None, **configs: Any, ) -> None: super().__init__(id=id, key=key, name=name, type=type, **configs) self.__context = self._assert_context(context) self.converter = self._assert_converter(converter) self.connector = self._assert_connector(connector) self.logger = self._assert_connector(logger) @classmethod def _assert_context(cls, context: DataManager) -> DataManager: if context is None or not isinstance(context, _DataManager): raise ResourceError(f"Invalid '{cls.__name__}' context: {type(context)}") return context @classmethod def _assert_converter(cls, converter: Optional[ChannelConverter]) -> ChannelConverter: if converter is None or not isinstance(converter, ChannelConverter): raise ResourceError(f"Invalid channel converter: {type(converter)}") return converter @classmethod def _assert_connector(cls, connector: Optional[ChannelConnector]) -> ChannelConnector: if connector is None: connector = ChannelConnector() elif not isinstance(connector, ChannelConnector): raise ResourceError(f"Invalid channel connector: {type(connector)}") return connector def _get_attrs(self) -> List[str]: return [ *super()._get_attrs(), "converter", "connector", "logger", "value", "state", "timestamp", ] # noinspection PyShadowingBuiltins def _get_vars(self) -> Dict[str, Any]: vars = super()._get_vars() vars["value"] = self.value vars["state"] = self.state vars["timestamp"] = self.timestamp return vars # noinspection PyShadowingBuiltins def __repr__(self) -> str: vars = OrderedDict(key=self.id) if self.is_valid(): vars["value"] = str(self.value) else: vars["state"] = str(self.state) vars["timestamp"] = str(self.timestamp) return f"{type(self).__name__}({', '.join(f'{k}={v}' for k, v in vars.items())})" @property def freq(self) -> Optional[str]: freq = self.get(next((k for k in ["freq", "frequency", "resolution"] if k in self), None), default=None) if freq is not None: freq = parse_freq(freq) return freq @property def timedelta(self) -> Optional[pd.Timedelta]: return to_timedelta(self.freq) @property def timestamp(self) -> pd.Timestamp: return self._timestamp @property def value(self) -> Optional[Any]: return self._value @value.setter def value(self, value) -> None: self._set(pd.Timestamp.now(tz.UTC).floor(freq="s"), value, ChannelState.VALID) @property def state(self) -> ChannelState | str: return self._state @state.setter def state(self, state) -> None: self._set(pd.Timestamp.now(tz.UTC).floor(freq="s"), None, state) def is_valid(self) -> bool: return self._is_valid(self.value, self.state) @staticmethod def _is_valid(value: Any, state: ChannelState) -> bool: return state == ChannelState.VALID and not Channel._is_empty(value) @staticmethod def _is_empty(value: Any) -> bool: if isinstance(value, Collection) and not isinstance(value, str) and not isinstance(value, bytes): return any(pd.isna(value)) return pd.isna(value) def set( self, timestamp: pd.Timestamp, value: Any, state: Optional[str | ChannelState] = ChannelState.VALID, ) -> None: value = self.converter(value) self._set(timestamp, value, state) # noinspection PyUnresolvedReferences def _set( self, timestamp: pd.Timestamp, value: Optional[Any], state: str | ChannelState, ) -> None: if not isinstance(timestamp, pd.Timestamp): raise ResourceError(f"Expected pandas Timestamp for '{self.id}', not: {type(value)}") self._timestamp = timestamp if self._is_empty(value) and state == ChannelState.VALID: raise ResourceError(f"Invalid value for valid state '{self.id}': {value}") self._value = value self._state = state # noinspection PyShadowingBuiltins, PyProtectedMember def _update( self, converter: Optional[Dict[str, Any] | str] = None, connector: Optional[Dict[str, Any] | str] = None, logger: Optional[Dict[str, Any] | str] = None, **configs: Any, ) -> None: if converter is not None: converter = Channel._build_member(converter, "converter") self.converter._update(**converter) if connector is not None: connector = Channel._build_member(connector, "connector") self.connector._update(**connector) if logger is not None: logger = Channel._build_member(logger, "connector") self.logger._update(**logger) super()._update(**configs) def _copy_args(self) -> Dict[str, Any]: arguments = super()._copy_args() arguments["context"] = self.__context # arguments["processors"] = self.processors.copy() arguments["converter"] = self.converter.copy() arguments["connector"] = self.connector.copy() arguments["logger"] = self.logger.copy() return arguments def copy(self) -> Channel: channel = super().copy() channel._timestamp = self._timestamp channel._value = self._value channel._state = self._state return channel def to_list(self) -> Channels: return Channels([self]) def to_configs(self) -> Dict[str, Any]: configs = super().to_configs() configs["converter"] = self.converter.to_configs() configs["connector"] = self.connector.to_configs() configs["logger"] = self.logger.to_configs() return configs def to_series(self, state: bool = False) -> pd.Series: if not self.is_valid() and state: return pd.Series(data=[self.state], index=[self.timestamp], name=self.key) return self.converter.to_series(self.value, self.timestamp, name=self.key) # noinspection PyProtectedMember def from_logger(self) -> Channel: channel = self.copy() channel._update(**self.logger._copy_configs()) return channel # noinspection PyShadowingBuiltins def has_logger(self, *ids: Optional[str]) -> bool: return self.logger.enabled and (any(self.logger.id == id for id in ids) if len(ids) > 0 else True) # noinspection PyShadowingBuiltins def has_connector(self, id: Optional[str] = None) -> bool: return self.connector.enabled and (self.connector.id == id if id is not None else True) # noinspection PyUnresolvedReferences def register( self, function: Callable[[pd.DataFrame], None], how: Literal["any", "all"] = "any", unique: bool = False, ) -> None: self.__context.register(function, self, how=how, unique=unique) # noinspection PyUnresolvedReferences def read( self, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> pd.DataFrame: return self.__context.read(self.to_list(), start, end) # noinspection PyUnresolvedReferences def write(self, data: pd.DataFrame | pd.Series | Any) -> None: if data is None: raise ResourceError(f"Invalid data to write '{self.id}': {data}") if isinstance(data, pd.Series): data.name = self.id data = data.to_frame() elif not isinstance(data, pd.DataFrame): data = pd.DataFrame(index=[pd.Timestamp.now(tz.UTC).floor(freq="s")], data=[data], columns=[self.id]) self.__context.write(data, self.to_list())