Source code for lories.connectors.tables

# -*- coding: utf-8 -*-
"""
lories.connectors.tables
~~~~~~~~~~~~~~~~~~~~~~~~


"""

from __future__ import annotations

import os
import re
from typing import Optional, Sequence

import pandas as pd
from lories.connectors import ConnectionError, Database, register_connector_type
from lories.typing import Configurations, Resources, Timestamp
from pandas import HDFStore


[docs] @register_connector_type("tables", "hdfstore") class HDFDatabase(Database): __store: HDFStore = None _store_dir: str _store_path: str _mode: str = "a" _columns_unique: bool = True _compression_level: int | None = None _compression_lib = None # noinspection PyTypeChecker def configure(self, configs: Configurations) -> None: super().configure(configs) store_dir = configs.get("path", default=None) if store_dir is not None: if "~" in store_dir: store_dir = os.path.expanduser(store_dir) if not os.path.isabs(store_dir): store_dir = os.path.join(configs.dirs.data, store_dir) else: store_dir = configs.dirs.data store_path = configs.get("path", default=None) if store_path is not None: if not os.path.isabs(store_path): store_path = os.path.join(store_dir, store_path) else: store_dir = os.path.dirname(store_path) else: store_file = configs.get("file", default=".store.h5") store_path = os.path.join(store_dir, store_file) self._store_dir = store_dir self._store_path = store_path self._mode = configs.get("mode", default="a") self._columns_unique = configs.get_bool("columns_unique", default=False) self._compression_level = configs.get_int("compression_level", None) self._compression_lib = configs.get("compression_lib", None) def is_connected(self) -> bool: return self.__store is not None and self.__store.is_open def connect(self, resources: Resources) -> None: super().connect(resources) if not os.path.isdir(self._store_dir): os.makedirs(self._store_dir, exist_ok=True) try: self.__store = HDFStore( self._store_path, mode=self._mode, complevel=self._compression_level, complib=self._compression_lib, ) self.__store.open() except IOError as e: raise ConnectionError(self, str(e)) def disconnect(self) -> None: super().disconnect() if self.__store is not None: self.__store.close() # noinspection PyTypeChecker, PyUnresolvedReferences def read( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> pd.DataFrame: data = [] try: for group, group_resources in resources.groupby("group"): group_key = _format_key(group) if group_key not in self.__store: continue group_data = self.__store.select( group_key, where=_build_where(start, end), columns=self.__build_columns(group_resources), ) group_data = self.__extract_data(group_resources, group_data) if not group_data.empty: data.append(group_data) except IOError as e: raise ConnectionError(self, str(e)) if len(data) == 0: return pd.DataFrame() data = sorted(data, key=lambda d: min(d.index)) return pd.concat(data, axis="columns") # noinspection PyTypeChecker, PyUnresolvedReferences def read_first(self, resources: Resources) -> Optional[pd.DataFrame]: data = [] try: for group, group_resources in resources.groupby("group"): group_key = _format_key(group) if group_key not in self.__store: continue group_data = self.__store.select(group_key, stop=1, columns=self.__build_columns(group_resources)) group_data = self.__extract_data(group_resources, group_data) if not group_data.empty: data.append(group_data) except IOError as e: raise ConnectionError(self, str(e)) if len(data) == 0: return pd.DataFrame() data = sorted(data, key=lambda d: min(d.index)) return pd.concat(data, axis="columns") # noinspection PyTypeChecker, PyUnresolvedReferences def read_last(self, resources: Resources) -> Optional[pd.DataFrame]: data = [] try: for group, group_resources in resources.groupby("group"): group_key = _format_key(group) if group_key not in self.__store: continue group_data = self.__store.select(group_key, start=-1, columns=self.__build_columns(group_resources)) group_data = self.__extract_data(group_resources, group_data) if not group_data.empty: data.append(group_data) except IOError as e: raise ConnectionError(self, str(e)) if len(data) == 0: return pd.DataFrame() data = sorted(data, key=lambda d: min(d.index)) return pd.concat(data, axis="columns") def delete( self, resources: Resources, start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> None: try: for group, group_resources in resources.groupby("group"): group_key = _format_key(group) if group_key not in self.__store: continue self.__store.remove(group_key) except IOError as e: raise ConnectionError(self, str(e)) def write(self, data: pd.DataFrame) -> None: try: for group, group_resources in self.resources.filter(lambda c: c.id in data.columns).groupby("group"): group_key = _format_key(group) group_data = data[group_resources.ids].dropna(axis="index", how="all").dropna(axis="columns", how="all") if not self._columns_unique: group_data.rename( columns={r.id: r.get("column", default=r.key) for r in group_resources}, inplace=True, ) if group_key not in self.__store: self.__store.put(group_key, group_data, format="table", encoding="UTF-8") else: self.__store.append(group_key, group_data, format="table", encoding="UTF-8") except IOError as e: raise ConnectionError(self, str(e)) def __build_columns(self, resources: Resources) -> Sequence[str]: if self._columns_unique: return [r.id for r in resources] return [r.get("column", default=r.key) for r in resources] def __extract_data(self, resources: Resources, data: pd.DataFrame) -> pd.DataFrame: data.dropna(axis="columns", how="all", inplace=True) if not self._columns_unique: return data.rename(columns={r.get("column", default=r.key): r.id for r in resources}) return data
def _format_key(key: str) -> str: key = re.sub(r"\W", "/", key).replace("_", "/").lower() return f"/{key}" def _build_where( start: Optional[Timestamp] = None, end: Optional[Timestamp] = None, ) -> Optional[str]: where = [] if start is not None: where.append(f'index>=Timestamp("{start.isoformat()}")') if end is not None: where.append(f'index<=Timestamp("{end.isoformat()}")') return " & ".join(where) if len(where) > 0 else None